// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include <assert.h>
#include <iostream>
#include <limits>
#include <chrono>
#include "pure_mem/epoche.h"
#include "pure_mem/abstract_version_node.h"
#include "pure_mem/key_index/art/rowex/art_n.h"

namespace rocksdb {

    LabelDelete::LabelDelete(void *n, NODE_TYPE type): node_(n),type_(type){
        clock_gettime(CLOCK_REALTIME, &timestamp_);
    }

    void LabelDelete::clone(const LabelDelete& other){
        this->node_ = other.node_;
        this->timestamp_ = other.timestamp_;
        this->type_ = other.type_;
    }

    void LabelDelete::delLabel(){
        switch (this->type_)
        {
        case DELETION_TYPE_ART:
            art_rowex::N::deleteNode(static_cast<art_rowex::N*>(this->node_));
            break;
        
        case DELETION_TYPE_VERSION:
            delete static_cast<rocksdb::VersionNode*>(this->node_);
            break;
        case DELETION_TYPE_NEWARRAY:
            delete [] static_cast<char*>(this->node_);
            break;
        case DELETION_TYPE_MALLOC:
            free(this->node_);
            break;

        
        default:
            assert(false);
            break;
        }
    }


    inline bool DeletionList::empty() const{
        return this->wait4freeList_.empty();
    }

    inline LabelDelete DeletionList::removeLast() {
    LabelDelete ret(nullptr, DELETION_TYPE_ART);
        this->mutex_.lock();
        if (!this->wait4freeList_.empty()){
        LabelDelete& cur = this->wait4freeList_.front();
        ret.clone(cur);
        this->wait4freeList_.pop();
        }
        this-> mutex_.unlock();
        return ret;
    }

    void DeletionList::add(void *n, NODE_TYPE type) {
        this->mutex_.lock();
        this->wait4freeList_.push(LabelDelete(n, type));
        this->mutex_.unlock();
    }

    DeleteWhileNoRefs* DeleteWhileNoRefs::ins_ = nullptr;
    void DeleteWhileNoRefs::markNodeForDeletion(void *n, NODE_TYPE type) {
        deleteList_.add(n, type);
    }
    void DeleteWhileNoRefs::cleanup(){
        while(true){
            if (this->closed_){
                while(!this->deleteList_.empty()){
                    LabelDelete node = this->deleteList_.removeLast();
                    node.delLabel();
                }
                return;
            }
            if (this->deleteList_.empty()){
                std::this_thread::sleep_for(std::chrono::seconds(this->startGCThreshhold_));
                continue;
            }
            LabelDelete node = this->deleteList_.removeLast();
            struct timespec curT;
            clock_gettime(CLOCK_REALTIME, &curT);
            long milse = (this->startGCThreshhold_ - curT.tv_sec + node.timestamp_.tv_sec ) * 1000 - (curT.tv_nsec - node.timestamp_.tv_nsec) / 1000000;
            if (milse >= 1) {
                std::this_thread::sleep_for(std::chrono::milliseconds(milse));
            }
            node.delLabel();
            
        }
    }
    DeleteWhileNoRefs::DeleteWhileNoRefs(): clean_(&DeleteWhileNoRefs::cleanup, this){}
    DeleteWhileNoRefs::~DeleteWhileNoRefs(){
        closed_ = true;
        clean_.join();
    }
}